Skip to content

PrefectDeployment#170

Merged
chrisguidry merged 9 commits intomainfrom
prefect-deployment
Jun 11, 2025
Merged

PrefectDeployment#170
chrisguidry merged 9 commits intomainfrom
prefect-deployment

Conversation

@chrisguidry
Copy link
Collaborator

@chrisguidry chrisguidry commented Jun 9, 2025

Adds the new PrefectDeployment CRD and a controller for it. This
CRD will create a Prefect Deployment against the given Prefect server,
which requires actually connecting to the running Prefect server for
the first time.

This also adds a Python script to the repository that helps with
generating a PrefectDeployment resource from a given flow, analogous
to the prefect deploy ... command. I'm aiming here to reuse as much
from PrefectHQ/prefect as possible there.

Note: the CRD spec ends up using camelCase names rather than
snake_case in order to retain consistency with the other CRDs and with
Kubernetes convetions.

Much of this was generated by Claude (guided by me) in several stages,
and I'll be reviewing this PR as if it were a fresh contribution.

Closes #164

Co-Authored-By: Claude noreply@anthropic.com

What Claude itself has to say about the changes:

🎯 High-Level Summary: PrefectDeployment CRD Implementation

This PR introduces a major new feature: PrefectDeployment Custom Resource Definition - allowing users to manage Prefect deployments (Python flows)
declaratively through Kubernetes. While the diff is large (~7k lines), the majority is comprehensive test coverage and generated code.

📋 What This PR Delivers

  1. New PrefectDeployment CRD (api/v1/prefectdeployment_types.go)
    - Declarative management of Prefect deployments (Python flows)
    - Supports schedules, parameters, work pools, and server connections
    - Full lifecycle management with proper cleanup via finalizers
  2. Production-Ready Controller (internal/controller/prefectdeployment_controller.go)
    - Reconciles K8s resources with Prefect API deployments
    - Handles creation, updates, deletion, and drift detection
    - Robust error handling and status reporting
  3. Prefect API Client (internal/prefect/client.go)
    - HTTP client for Prefect API operations (deployments, flows)
    - Support for both Prefect Cloud and self-hosted servers
    - Port-forwarding for local development scenarios

🧪 Quality & Testing Focus

The large line count is primarily comprehensive test coverage:

  • 911 lines: PrefectDeployment controller tests (integration scenarios)
  • 665 lines: Prefect API client tests (HTTP mocking, error cases)
  • 595 lines: Conversion logic tests (100% coverage, edge cases)
  • 466 lines: Server reference tests (API keys, URLs, error handling)

🏗️ Architecture Improvements

  1. Better Separation of Concerns
    - Moved API key logic from controller to PrefectServerReference domain object
    - Centralized Prefect API conversion logic in dedicated modules
    - Cleaner client creation patterns
  2. Code Simplification
    - Inlined trivial helper functions to reduce indirection
    - More direct method calls instead of verbose wrapper functions
    - Removed unnecessary abstractions
  3. Robust Error Handling
    - Comprehensive error scenarios in tests
    - Proper finalizer-based cleanup to prevent resource leaks
    - Graceful handling of Prefect API failures

📖 Documentation & Tooling

  • Generated CRD docs (PrefectDeployment.md)
  • Python helper script for generating deployment YAMLs
  • Sample configurations for common use cases

🎯 Review Focus Areas

  1. Core Logic: prefectdeployment_controller.go (~260 lines) - the main reconciliation logic
  2. API Types: prefectdeployment_types.go (~230 lines) - the CRD structure
  3. Client Interface: internal/prefect/client.go - Prefect API integration
  4. Conversion Logic: internal/prefect/convert.go - K8s ↔ Prefect API mapping

Everything else is either tests, generated code, or documentation.

chrisguidry and others added 4 commits June 6, 2025 16:38
- Added comprehensive HTTP client tests with proper organization
- Implemented authentication testing for Prefect Cloud, self-hosted, and empty API keys
- Added error handling tests for network failures, context cancellation, and invalid responses
- Enhanced test coverage for deployment controller error scenarios
- Added API key handling tests from various sources (secrets, configmaps, direct values)
- Fixed Makefile to automatically exclude generated code from coverage
- Updated AGENTS.md with better documentation structure and portforward testing guidance
- Improved overall test consistency across all controllers
- Fixed linting issues with error handling and unused functions

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This change reduces log noise by demoting fine-grained operational details
to DEBUG level while keeping important milestones at INFO level:

- HTTP API calls in Prefect client (16 log statements)
- Routine reconciliation start messages in all controllers
- CreateOrUpdate operation results for Kubernetes resources
- Port-forwarding setup and HTTP/2 disable messages

Important events like migration jobs, API sync milestones, and significant
state changes remain at INFO level. Debug logging can be enabled with --v=1.

Also fixes parameter schema handling in create-prefect-deployment.py by
properly converting ParameterSchema objects to dictionaries using model_dump().

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
DeploymentLabels map[string]string `json:"deploymentLabels,omitempty"`
}

type PrefectServerReference struct {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was hoisted up to be reusable between PrefectWorkPool and PrefectDeployment

Comment on lines +17 to +46
apiVersion: prefect.io/v1
kind: PrefectDeployment
metadata:
name: hello-there
spec:
server:
name: prefect-ephemeral

workPool:
name: process-pool

deployment:
entrypoint: "flows/hello_world.py:hello"
pullSteps:
- prefect.deployments.steps.git_clone:
repository: https://github.com/PrefectHQ/examples.git
branch: main
parameters:
name: "prefect-operator!"
parameterOpenApiSchema:
title: Parameters
type: object
properties:
name:
default: Marvin
position: 0
title: name
type: string
required: []
definitions: {}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a fully working example:

image

image

image

image

Remove obvious comments that just describe what the next line does,
keeping only comments that add actual value like explaining business
logic or non-obvious behavior.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
- Add PrefectDeploymentFinalizer constant for deployment cleanup
- Modify Reconcile function to handle deletion and ensure finalizer presence
- Implement two-phase reconciliation: finalizer addition then sync
- Add handleDeletion method to clean up deployments from Prefect API
- Fix HTTP status code handling to accept both 200 and 204 for DELETE operations
- Update tests to verify finalizer behavior and deletion workflow
- Fix deprecated Requeue usage in favor of RequeueAfter

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
- Create convert_test.go with 100% coverage for all conversion functions
- Test ConvertToDeploymentSpec with all conditional paths and error cases
- Cover version info, work queue, parameters, job variables, schemas, pull steps, schedules
- Test error handling for invalid JSON in all unmarshaling operations
- Add edge cases for anchor date parsing and multiple schedules/pull steps
- Test UpdateDeploymentStatus and GetFlowIDFromDeployment functions
- Improve ConvertToDeploymentSpec coverage from 39.2% to 100%
- Ensure robust validation of complex data structures and boundary conditions

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
Updates controller tests to handle two-phase reconciliation pattern and
finalizer lifecycle management:

- Add missing import for errors.IsNotFound
- Trigger reconciliation after deletion to process finalizer removal
- Update error scenario tests to expect errors on second reconcile call
- Fix timing issues with finalizer-based deletion in status update test

All 112 tests now pass with comprehensive coverage of convert.go at 100%.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
- Inlined createPrefectClient calls with direct prefect.NewClientFromK8s() usage
- Inlined calculateSpecHash calls with direct utils.Hash() usage
- Moved getAPIKey method from controller to PrefectServerReference.GetAPIKey()
- Added comprehensive test suite for PrefectServerReference with 28 test cases
- Created NewClientFromK8s function combining API key retrieval with client creation
- Removed unused helper methods to reduce code indirection
- Updated controller tests to use utils.Hash directly
- Cleaned up imports (removed unused logr from controller)

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
Comment on lines +92 to +100
// Ensure finalizer is present
if !controllerutil.ContainsFinalizer(&deployment, PrefectDeploymentFinalizer) {
controllerutil.AddFinalizer(&deployment, PrefectDeploymentFinalizer)
if err := r.Update(ctx, &deployment); err != nil {
log.Error(err, "Failed to add finalizer", "deployment", deployment.Name)
return ctrl.Result{}, err
}
return ctrl.Result{RequeueAfter: time.Second}, nil
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're using a two-step deletion with a finalizer to allow for deleting the deployment from the Prefect server. This differs from the other controllers, because they are just managing Kubernetes objects, so there's no need for dealing with external objects.

@chrisguidry chrisguidry marked this pull request as ready for review June 9, 2025 19:31
@chrisguidry chrisguidry requested review from a team as code owners June 9, 2025 19:31
baseClient := NewClient("", apiKey, log)

// Determine if we need port-forwarding
needsPortForwarding := !baseClient.isRunningInCluster() && serverRef.IsInCluster()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-blocking: I don't fully understand the need for this yet, but I'm happy to proceed with it for now. I think I would have suspected that this controller would reach the Prefect API by either the domain on the Ingress (if external) or by the Service (if internal).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh, yes, great question, this is for local development. When developing the operator, you're running it locally on your laptop against some other cluster (like minikube, etc). Since the operator isn't actually running inside the cluster where the Prefect server is running, we can't access it with cluster-local addresses, so we need to port-forward to access it. Totally open to other approaches here, because this does add a lot of code/complexity.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh roger that. Maybe in the future we can clarify that somehow but sounds good for now!

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@chrisguidry just my two cents here but perhaps you can check out devspace or okteto. These are projects which allow you to sync source code into a running container in a given cluster including reloading/restarting the container if the local source code changes. I guess this could be used to make changes to the controller locally, have it synced and then restarted automatically in the cluster. That way you would not need the code/complexity of port forwarding. I am familiar with devspace and could help you out there, It's relatively easy to setup. Unfortunately, devspace is not actively developer anymore afaik. Hence, the okteto alternative which I have not tried yet.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah thanks for the tip, I'll check that out!

@chrisguidry chrisguidry merged commit 4e1ecda into main Jun 11, 2025
8 checks passed
@chrisguidry chrisguidry deleted the prefect-deployment branch June 11, 2025 13:41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Prefect Deployment CRD

4 participants